[Hadoop] 分布式处理框架MapReduce 二

MapReduce 源代码解析(Hadoop Mapper 定义、Hadoop Reducer 定义、Hadoop Partitioner 定义和默认实现)、MapReduce执行机制、MapReduce 容错性、数据本地性问题、参数调优

Posted by 李玉坤 on 2017-06-17

MapReduce 源代码解析

Hadoop Mapper 定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
}

Hadoop Reducer 定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
public abstract class Context
implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
}
cleanup(context);
}
}

Hadoop Partitioner 定义和默认实现

Partitioner抽象类定义

1
2
3
4
5
public abstract class Partitioner<KEY, VALUE> {

public abstract int getPartition(KEY key, VALUE value, int numPartitions);

}

默认HashPartitioner实现

1
2
3
4
5
6
7
public class HashPartitioner<K, V> extends Partitioner<K, V> {

public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

}

Hadoop InputFormat 定义和默认实现

与 InputFormat 有关的类

TextInputFormat处理流程

MapReduce执行机制

MapReduce on Yarn



RM:

  • applications Manager 应用程序管理器
  • resource scheduler 资源memory+cpu调度器

红色框是什么?

  • container 虚拟的概念 属于NM节点上,专门用来MR、spark等技术的最小单元
  • map task
  • reduce task

图中流程

  1. 用户向Yarn提交应用程序(job app application),jar文件、sql;其中包裹ApplicationMaster程序、启动ApplicationMaster的命令等等
  2. RM为该job分配第一个container,运行job的ApplicationMaster
  3. App Master向applications Manager注册,这样就可以在RM WEB界面查询这个job的运行状态
  4. App Master采用轮询的方式通过RPC协议向RM申请和领取资源
  5. 一旦App Master拿到资源,就对应的与NM通信,要求启动任务
  6. NM为任务设置好运行环境(jar包等),将任务启动命令写在一个脚本里。并通过该脚本启动任务 task
  7. 各个task通过rpc协议向App Master汇报自己的状态和进度,以此让App Master随时掌握各个task的运行状态。从而在task运行失败重启任务。
  8. App Master向applications Manager注销且关闭自己

MapReduce 容错性

  • Case 1. 如果Task运行失败
    • Map Task失败
      • MRAppMaster重启Map Task,Map Task没有依赖性
    • Reduce Task失败
      • MRAppMaster重启Reduce Task,Map Task的输出保存在磁盘上
    • 同一个Task运行多次失败(默认4次)则本次作业失败
  • Case 2. 如果Task所在的Node节点挂了
    • 在另外一个节点上重启所有在挂掉节点上曾经运行过的任务
  • Case 3. 如果Task运行缓慢
    • 通常由于硬件损坏、软件Bug或者配置错误导致
    • 单个task运行缓慢会显著影响整体作业运行时间
    • 解决方案:推测执行
      • 在另外一个节点上启动相同的任务,谁先完成就kill掉另外一个节点
        上的任务
    • 无法启动推测执行的情况:写入数据库

MapReduce 数据本地性问题

  • 在集群中网络资源是一种稀缺资源
  • 文件在HDFS上存储在不同的DataNode节点上
  • 如果Map Task任务从远程机器上拷贝数据会消耗大量的网络带宽

  • HDFS上同一份文件会有多份拷贝(默认是3份)
  • MapReduce调度原则
    • 在包含副本的节点上启动Map Task任务
    • 或者在就近的节点上启动Map Task任务
  • 因此数据本地性有三个级别
    • Node Local
      • Map Task和数据在同一个节点上
    • Rack Local
      • Map Task和数据在同一个机架上
    • Different Rack
      • Map Task和数据即不再同一个节点又不在同一个机架上